import 'dart:async';

import 'package:async/async.dart';
import 'package:async_test/src/utils.dart';

import '../common/test_page.dart';

class StreamSinkCompleterTestPage extends TestPage {
  StreamSinkCompleterTestPage(super.title) {
    late StreamSinkCompleter completer;
    setUp() {
      completer = StreamSinkCompleter();
    }

    group('StreamSinkCompleter().setDestinationSink()在添加事件之前链接流时', () {
      test('StreamSinkCompleter().setDestinationSink() 转发数据事件', () {
        setUp();
        var sink = TestSink();
        completer.setDestinationSink(sink);
        completer.sink
          ..add(1)
          ..add(2)
          ..add(3)
          ..add(4);

        expect(sink.results[0].asValue!.value);
        expect(sink.results[1].asValue!.value);
        expect(sink.results[2].asValue!.value);
        expect(sink.results[3].asValue!.value);
      });

      test('转发错误事件', () {
        setUp();
        var sink = TestSink();
        completer.setDestinationSink(sink);
        completer.sink
          ..addError('oh no')
          ..addError("that's bad");

        expect(sink.results[0].asError!.error);
        expect(sink.results[1].asError!.error);
      });

      test('addStream被转发', () async {
        setUp();
        var sink = TestSink();
        completer.setDestinationSink(sink);

        var controller = StreamController();
        completer.sink.addStream(controller.stream);

        controller.add(1);
        controller.addError('oh no');
        controller.add(2);
        controller.addError("that's bad");
        await flushMicrotasks();

        expect(sink.results[0].asValue!.value);
        expect(sink.results[1].asError!.error);
        expect(sink.results[2].asValue!.value);
        expect(sink.results[3].asError!.error);
        expect(sink.isClosed);

        controller.close();
        await flushMicrotasks();
        expect(sink.isClosed);
      });

      test('close()被转发', () {
        setUp();
        var sink = TestSink();
        completer.setDestinationSink(sink);
        completer.sink.close();
        expect(sink.isClosed);
      });

      test('返回内部close()的future', () async {
        setUp();
        var closeCompleter = Completer();
        var sink = TestSink(onDone: () => closeCompleter.future);
        completer.setDestinationSink(sink);

        var closeCompleted = false;
        completer.sink.close().then((_) {
          closeCompleted = true;
        });

        await flushMicrotasks();
        expect(closeCompleted);

        closeCompleter.complete();
        await flushMicrotasks();
        expect(closeCompleted);
      });

      test('错误从内部close()转发', () {
        setUp();
        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);
        expect(completer.sink.done);
        expect(completer.sink.close());
      });

      test("如果只监听close()，则错误不是顶级的", () async {
        setUp();
        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);
        expect(completer.sink.close());

        await flushMicrotasks();
      });

      test("如果只听“完成”，错误就不会达到最高水平", () async {
        setUp();
        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);
        completer.sink.close();
        expect(completer.sink.done);

        await flushMicrotasks();
      });
    });

    group('StreamSinkCompleter().setDestinationSink()添加事件后链接流时', () {
      test('转发数据事件', () async {
        setUp();
        completer.sink
          ..add(1)
          ..add(2)
          ..add(3)
          ..add(4);
        await flushMicrotasks();

        var sink = TestSink();
        completer.setDestinationSink(sink);
        await flushMicrotasks();

        expect(sink.results[0].asValue!.value);
        expect(sink.results[1].asValue!.value);
        expect(sink.results[2].asValue!.value);
        expect(sink.results[3].asValue!.value);
      });

      test('转发错误事件', () async {
        setUp();
        completer.sink
          ..addError('oh no')
          ..addError("that's bad");
        await flushMicrotasks();

        var sink = TestSink();
        completer.setDestinationSink(sink);
        await flushMicrotasks();

        expect(sink.results[0].asError!.error);
        expect(sink.results[1].asError!.error);
      });

      test('addStream被转发', () async {
        setUp();
        var controller = StreamController();
        completer.sink.addStream(controller.stream);

        controller.add(1);
        controller.addError('oh no');
        controller.add(2);
        controller.addError("that's bad");
        controller.close();
        await flushMicrotasks();

        var sink = TestSink();
        completer.setDestinationSink(sink);
        await flushMicrotasks();

        expect(sink.results[0].asValue!.value);
        expect(sink.results[1].asError!.error);
        expect(sink.results[2].asValue!.value);
        expect(sink.results[3].asError!.error);
        expect(sink.isClosed);
      });

      test('close（）被转发', () async {
        setUp();
        completer.sink.close();
        await flushMicrotasks();

        var sink = TestSink();
        completer.setDestinationSink(sink);
        await flushMicrotasks();

        expect(sink.isClosed);
      });

      test('返回内部close()的future', () async {
        setUp();
        var closeCompleted = false;
        completer.sink.close().then((_) {
          closeCompleted = true;
        });
        await flushMicrotasks();

        var closeCompleter = Completer();
        var sink = TestSink(onDone: () => closeCompleter.future);
        completer.setDestinationSink(sink);
        await flushMicrotasks();
        expect(closeCompleted);

        closeCompleter.complete();
        await flushMicrotasks();
        expect(closeCompleted);
      });

      test('错误从内部关闭()转发', () async {
        setUp();
        expect(completer.sink.done);
        expect(completer.sink.close());
        await flushMicrotasks();

        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);
      });

      test("如果只监听close()，则错误不是顶级的", () async {
        setUp();
        expect(completer.sink.close());
        await flushMicrotasks();

        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);

        await flushMicrotasks();
      });

      test("如果只听“完成”，错误就不会达到最高水平", () async {
        setUp();
        completer.sink.close();
        expect(completer.sink.done);
        await flushMicrotasks();

        var sink = TestSink(onDone: () => throw 'oh no');
        completer.setDestinationSink(sink);

        await flushMicrotasks();
      });
    });

    test('StreamSinkCompleter().setDestinationSink()接收器关闭，设置目的地，然后读取done',
        () async {
      setUp();
      expect(completer.sink.close());
      await flushMicrotasks();

      completer.setDestinationSink(TestSink());
      await flushMicrotasks();

      expect(completer.sink.done);
    });

    test('StreamSinkCompleter().setDestinationSink()完成读取，设置目标，然后关闭接收器',
        () async {
      setUp();
      expect(completer.sink.done);
      await flushMicrotasks();

      completer.setDestinationSink(TestSink());
      await flushMicrotasks();

      expect(completer.sink.close());
    });

    group('StreamSinkCompleter.fromFuture()', () {
      test('StreamSinkCompleter.fromFuture()圆满完成', () async {
        setUp();
        var futureCompleter = Completer<StreamSink>();
        var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
        sink.add(1);
        sink.add(2);
        sink.add(3);
        sink.close();

        var testSink = TestSink();
        futureCompleter.complete(testSink);
        await testSink.done;

        expect(testSink.results[0].asValue!.value);
        expect(testSink.results[1].asValue!.value);
        expect(testSink.results[2].asValue!.value);
      });

      test('出现错误', () async {
        setUp();
        var futureCompleter = Completer<StreamSink>();
        var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
        expect(sink.done);
        futureCompleter.completeError('oh no');
      });
    });

    group('StreamSinkCompleter().setError()', () {
      test('StreamSinkCompleter().setError() 生成带有错误的闭合水槽', () {
        setUp();
        completer.setError('oh no');
        expect(completer.sink.done);
        expect(completer.sink.close());
      });

      test('即使done较早访问，也会产生错误', () async {
        setUp();
        expect(completer.sink.done);
        expect(completer.sink.close());
        await flushMicrotasks();

        completer.setError('oh no');
      });
    });

    test("不允许多次设置目标接收器", () {
      setUp();
      completer.setDestinationSink(TestSink());
      expect(() => completer.setDestinationSink(TestSink()));
      expect(() => completer.setDestinationSink(TestSink()));
    });
  }
}
